/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */

#ifndef OCEANBASE_SQL_REWRITE_QUERY_RANGE_
#define OCEANBASE_SQL_REWRITE_QUERY_RANGE_

#include "lib/allocator/ob_allocator.h"
#include "lib/list/ob_list.h"
#include "lib/list/ob_obj_store.h"
#include "lib/container/ob_fixed_array.h"
#include "lib/hash/ob_placement_hashmap.h"
#include "lib/timezone/ob_timezone_info.h"
#include "sql/rewrite/ob_query_range_provider.h"
#include "sql/rewrite/ob_key_part.h"
#include "sql/resolver/ob_schema_checker.h"
#include "sql/parser/ob_item_type.h"

namespace oceanbase {
namespace common {
class ObDataTypeCastParams;
}
namespace sql {
class ObRawExpr;
class ObConstRawExpr;
class ObOpRawExpr;
typedef common::ObDList<ObKeyPart> ObKeyPartList;
typedef common::Ob2DArray<common::ObObjParam, common::OB_MALLOC_BIG_BLOCK_SIZE, ObWrapperAllocator, false> ParamsIArray;
typedef common::ObIArray<ObRawExpr*> ExprIArray;
typedef common::ObIArray<ColumnItem> ColumnIArray;

class ObQueryRange : public ObQueryRangeProvider {
  OB_UNIS_VERSION(4);

private:
  struct ObRangeExprItem {
    const ObRawExpr* cur_expr_;
    common::ObSEArray<int64_t, 16> cur_pos_;
    DECLARE_TO_STRING;
  };

  struct ArrayParamInfo {
    ArrayParamInfo() : param_index_(OB_INVALID_ID)
    {}
    TO_STRING_KV(K_(param_index));
    int64_t param_index_;
  };

  struct ObQueryRangeCtx {
    ObQueryRangeCtx(const ParamsIArray* params)
        : need_final_extact_(false), cur_expr_is_precise_(false), params_(params)
    {}
    ~ObQueryRangeCtx()
    {}
    void clear()
    {
      key_part_map_.reset();
      need_final_extact_ = false;
    }
    common::hash::ObPlacementHashMap<ObKeyPartId, ObKeyPartPos, 131> key_part_map_;
    bool need_final_extact_;
    bool cur_expr_is_precise_;  // range extracted using current expr is precise, not been enlarged
    common::ObSEArray<ObRangeExprItem, 4, common::ModulePageAllocator, true> precise_range_exprs_;
    common::ObSEArray<int64_t, 16, common::ModulePageAllocator, true> param_indexs_;
    const ParamsIArray* params_;
  };

public:
  enum ObQueryRangeState {
    NEED_INIT = 0,
    NEED_TARGET_CND,
    NEED_PREPARE_PARAMS,
    CAN_READ,
  };

  enum ObRowBorderType {
    OB_FROM_NONE,
    OB_FROM_LEFT,
    OB_FROM_RIGHT,
  };

  enum ObRangeKeyType {
    T_GET,
    T_SCAN,
    T_FULL,
    T_EMPTY,
  };

  struct ObRangeKeyInfo {
    ObRangeKeyType key_type_;
    TO_STRING_KV(N_TYPE, static_cast<int32_t>(key_type_));
  };

  struct ObRangeWrapper {
    common::ObNewRange* range_;

    ObRangeWrapper() : range_(NULL)
    {}

    ObRangeWrapper(const ObRangeWrapper& other) : range_(other.range_)
    {}

    ~ObRangeWrapper()
    {
      range_ = NULL;
    }

    uint64_t hash() const
    {
      uint64_t uval = 0;
      if (NULL == range_) {
        SQL_REWRITE_LOG(WARN, "range_ is not inited.");
      } else {
        uval = range_->hash();
      }
      return uval;
    }

    bool operator==(const ObRangeWrapper& other) const
    {
      bool is_equal = false;
      if (NULL != range_) {
        is_equal = range_->equal2(*other.range_);
      } else if (NULL == other.range_) {
        is_equal = true;
      } else {
        // do nothing
      }
      return is_equal;
    }
  };

private:
  struct ObSearchState {
    ObSearchState(common::ObIAllocator& allocator)
        : start_(NULL),
          end_(NULL),
          include_start_(NULL),
          include_end_(NULL),
          depth_(0),
          max_exist_index_(0),
          last_include_start_(false),
          last_include_end_(false),
          produce_range_(false),
          is_equal_range_(false),
          is_empty_range_(false),
          allocator_(allocator),
          range_set_()
    {}

    bool has_intersect(const common::ObObj& start, bool include_start, const common::ObObj& end, bool include_end) const
    {
      bool bret = true;
      if (NULL != start_ && NULL != end_ && include_start_ != NULL && include_end_ != NULL && depth_ >= 0) {
        common::ObObj& s1 = start_[depth_];
        common::ObObj& e1 = end_[depth_];
        bool include_s1 = include_start_[depth_];
        bool include_e1 = include_end_[depth_];
        int cmp_s2_e1 = 0;
        int cmp_e2_s1 = 0;
        if ((cmp_s2_e1 = start.compare(e1)) > 0 || (cmp_e2_s1 = end.compare(s1)) < 0 ||
            (0 == cmp_s2_e1 && (!include_start || !include_e1)) || (0 == cmp_e2_s1 && (!include_end || !include_s1))) {
          bret = false;
        }
      } else {
        bret = false;
      }
      return bret;
    }

    int intersect(const common::ObObj& start, bool include_start, const common::ObObj& end, bool include_end);
    int tailor_final_range(int64_t column_count);
    common::ObObj* start_;
    common::ObObj* end_;
    bool* include_start_;
    bool* include_end_;
    int depth_;
    int64_t max_exist_index_;
    bool last_include_start_;
    bool last_include_end_;
    bool produce_range_;
    bool is_equal_range_;
    bool is_empty_range_;
    common::ObIAllocator& allocator_;
    common::hash::ObHashSet<ObRangeWrapper, common::hash::NoPthreadDefendMode> range_set_;
  };

  struct ObRangeGraph {
    ObRangeGraph() : key_part_head_(NULL), is_equal_range_(false), is_standard_range_(true), is_precise_get_(false)
    {
      // we consider empty condition is also a simple range, so init is_standard_range_ with true.
    }

    void reset()
    {
      key_part_head_ = NULL;
      is_equal_range_ = false;
      is_standard_range_ = true;
      is_precise_get_ = false;
    }

    int assign(const ObRangeGraph& other)
    {
      int ret = common::OB_SUCCESS;
      key_part_head_ = other.key_part_head_;
      is_equal_range_ = other.is_equal_range_;
      is_standard_range_ = other.is_standard_range_;
      is_precise_get_ = other.is_precise_get_;
      return ret;
    }

    ObKeyPart* key_part_head_;
    bool is_equal_range_;
    bool is_standard_range_;
    bool is_precise_get_;
  };

public:
  ObQueryRange();
  explicit ObQueryRange(common::ObIAllocator& alloc);
  virtual ~ObQueryRange();
  ObQueryRange& operator=(const ObQueryRange& other);

  //  After reset(), the caller need to re-init the query range

  void reset();

  //  preliminary_extract_query_range will prelininary extract query range
  //  from query conditions, which is only occurred in generating the physical plan.
  //  During this stage, some consts are not really known, for example,
  //  prepared params, session variables, global variables, now(), curret_timestamp(),
  //  utc_timestamp, etc..

  //  final extraction may be need in physical plan open.

  int preliminary_extract_query_range(const ColumnIArray& range_columns, const ObRawExpr* expr_root,
      const common::ObDataTypeCastParams& dtc_params, const ParamsIArray* params = NULL);
  int preliminary_extract_query_range(const ColumnIArray& range_columns, const ExprIArray& root_exprs,
      const common::ObDataTypeCastParams& dtc_params, const ParamsIArray* params = NULL);

  // UNUSED NOW
  int extract_query_range(const ColumnIArray& range_columns, ObRawExpr* expr_root, const ParamsIArray& params,
      const common::ObDataTypeCastParams& dtc_params);
  int extract_query_range(const ColumnIArray& range_columns, const ExprIArray& root_exprs, const ParamsIArray& params,
      const common::ObDataTypeCastParams& dtc_params);

  //  final_extract_query_range extracts the final query range of its physical plan.
  //  It will get the real-time value of some const which are unknow during physical plan generating.
  //  Query range can not be used until this function is called.

  int final_extract_query_range(const ParamsIArray& params, const common::ObDataTypeCastParams& dtc_params);

  // get_tablet_ranges gets range of a index.
  // This function can not be used untill physical plan is opened.

  virtual int get_tablet_ranges(
      ObQueryRangeArray& ranges, ObGetMethodArray& get_methods, const common::ObDataTypeCastParams& dtc_params);

  int get_tablet_ranges(common::ObIAllocator& allocator, const ParamsIArray& params, ObQueryRangeArray& ranges,
      ObGetMethodArray& get_methods, const common::ObDataTypeCastParams& dtc_params,
      common::ObIArray<int64_t>* range_pos = NULL) const;
  int get_tablet_ranges(common::ObIAllocator& allocator, const ParamsIArray& params, ObQueryRangeArray& ranges,
      bool& all_single_value_ranges, const common::ObDataTypeCastParams& dtc_params) const;
  // deep copy query range except the pointer of phy_plan_
  int deep_copy(const ObQueryRange& other);
  // necessary condition:
  //  true returned, all ranges are get-conditions, becareful, final range maybe (max, min);
  //  false returned, maybe all ranges are scan-conditions,
  //  maybe some get-condition(s) and some scan-condition(s)
  //  or maybe all ranges are get-conditions after final extraction.

  int all_single_value_ranges(bool& all_single_values, const common::ObDataTypeCastParams& dtc_params);
  // USE only in test.
  bool is_precise_whole_range() const
  {
    bool bret = false;
    if (NULL == table_graph_.key_part_head_) {
      bret = true;
    } else if (table_graph_.key_part_head_->is_always_true()) {
      bret = true;
    } else if (table_graph_.key_part_head_->pos_.offset_ > 0) {
      bret = true;
    }
    return bret;
  }

  // XXX: This function may raise problem because of reverse index.
  int is_min_to_max_range(bool& is_min_to_max_range, const ObDataTypeCastParams& dtc_params);
  int is_at_most_one_row(bool &is_one_row) const;
  int check_is_at_most_one_row(ObKeyPart &key_part, const int64_t depth, const int64_t column_count,
                               bool &bret) const;
  int is_get(bool& is_get) const;
  int is_get(int64_t column_count, bool& is_get) const;
  bool is_precise_get() const
  {
    return table_graph_.is_precise_get_;
  }
  const common::ObIArray<ObRawExpr*>& get_range_exprs() const
  {
    return range_exprs_;
  }
  int check_graph_type();

  static bool can_be_extract_range(ObItemType cmp_type, const ObExprResType& col_type, const ObExprCalcType& res_type,
      common::ObObjType data_type, bool& always_true);

  // that mean the query range contain non-standard range graph,
  // need copy from ObTableScan operator to physical operator context to extract query range

  bool need_deep_copy() const;
  inline bool has_range() const
  {
    return column_count_ > 0;
  }
  inline int64_t get_column_count() const
  {
    return column_count_;
  }
  const ObRangeGraph& get_table_grapth() const
  {
    return table_graph_;
  }
  int get_param_value(common::ObObj& val, const ParamsIArray& params) const;
  DECLARE_TO_STRING;

private:
  //  @brief this function to initialize query range context
  //  @param range_columns[in], columns group with the range order
  //  @return if success, return OB_SUCCESS
  int inner_get_tablet_ranges(common::ObIAllocator& allocator, const ParamsIArray& params, ObQueryRangeArray& ranges,
      ObGetMethodArray& get_methods, const common::ObDataTypeCastParams& dtc_params) const;
  int init_query_range_ctx(
      common::ObIAllocator& allocator, const ColumnIArray& range_columns, const ParamsIArray* params);
  void destroy_query_range_ctx(common::ObIAllocator& allocator);

  // @brief escape_expr only be used when cmp_type == T_OP_LIKE

  int get_basic_query_range(const ObRawExpr* l_expr, const ObRawExpr* r_expr, const ObRawExpr* escape_expr,
      ObItemType cmp_type, const ObExprResType& result_type, ObKeyPart*& out_key_part,
      const common::ObDataTypeCastParams& dtc_params);
  int get_const_key_part(const ObRawExpr* l_expr, const ObRawExpr* r_expr, const ObRawExpr* escape_expr,
      ObItemType cmp_type, const ObExprResType& result_type, ObKeyPart*& out_key_part,
      const common::ObDataTypeCastParams& dtc_params);
  int get_column_key_part(const ObRawExpr* l_expr, const ObRawExpr* r_expr, const ObRawExpr* escape_expr,
      ObItemType cmp_type, const ObExprResType& result_type, ObKeyPart*& out_key_part,
      const common::ObDataTypeCastParams& dtc_params);
  int get_normal_cmp_keypart(ObItemType cmp_type, const common::ObObj& val, ObKeyPart& out_keypart) const;
  int get_row_key_part(const ObRawExpr* l_expr, const ObRawExpr* r_expr, ObItemType cmp_type,
      const ObExprResType& result_type, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int add_row_item(ObKeyPart*& row_tail, ObKeyPart* key_part);
  int add_and_item(ObKeyPartList& and_storage, ObKeyPart* key_part);
  int add_or_item(ObKeyPartList& or_storage, ObKeyPart* key_part);
  int preliminary_extract(const ObRawExpr* node, ObKeyPart*& out_key_part,
      const common::ObDataTypeCastParams& dtc_params, const bool is_single_in = false);
  int pre_extract_basic_cmp(
      const ObRawExpr* node, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_ne_op(
      const ObOpRawExpr* t_expr, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_is_op(
      const ObOpRawExpr* t_expr, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_btw_op(
      const ObOpRawExpr* t_expr, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_not_btw_op(
      const ObOpRawExpr* t_expr, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_in_op(
      const ObOpRawExpr* b_expr, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_single_in_op(
      const ObOpRawExpr* b_expr, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_and_or_op(
      const ObOpRawExpr* m_expr, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int pre_extract_const_op(const ObConstRawExpr* node, ObKeyPart*& out_key_part);
  int is_key_part(const ObKeyPartId& id, ObKeyPartPos& pos, bool& is_key_part);
  int split_general_or(ObKeyPart* graph, ObKeyPartList& or_storage);
  int split_or(ObKeyPart* graph, ObKeyPartList& or_list);
  int deal_not_align_keypart(ObKeyPart* l_key_part, ObKeyPart* r_key_part, ObKeyPart*& rest);
  int intersect_border_from(const ObKeyPart* l_key_part, const ObKeyPart* r_key_part,
      ObRowBorderType& start_border_type, ObRowBorderType& end_border_type, bool& is_always_false);
  int set_partial_row_border(ObKeyPart* l_gt, ObKeyPart* r_gt, ObRowBorderType start_border_type,
      ObRowBorderType end_border_type, ObKeyPart*& result);
  //  int link_item(ObKeyPart *l_gt, ObKeyPart *r_gt);
  int do_key_part_node_and(ObKeyPart* l_key_part, ObKeyPart* r_key_part, ObKeyPart*& res_key_part);
  int deep_copy_key_part_and_items(const ObKeyPart* src_key_part, ObKeyPart*& dest_key_part);
  int and_single_gt_head_graphs(ObKeyPartList& l_array, ObKeyPartList& r_array, ObKeyPartList& res_array);
  int and_range_graph(ObKeyPartList& ranges, ObKeyPart*& out_key_part);
  int do_row_gt_and(ObKeyPart* l_gt, ObKeyPart* r_gt, ObKeyPart*& res_gt);
  int do_gt_and(ObKeyPart* l_gt, ObKeyPart* r_gt, ObKeyPart*& res_gt);
  int link_or_graphs(ObKeyPartList& storage, ObKeyPart*& out_key_part);
  int definite_key_part(
      ObKeyPart*& key_part, const ParamsIArray& params, const common::ObDataTypeCastParams& dtc_params);
  int replace_questionmark(ObKeyPart* root, const ParamsIArray& params, const common::ObDataTypeCastParams& dtc_params);
  int or_single_head_graphs(
      ObKeyPartList& or_list, const ParamsIArray* params, const common::ObDataTypeCastParams& dtc_params);
  int union_single_equal_cond(ObKeyPartList& ranges, const ParamsIArray* params,
      const common::ObDataTypeCastParams& dtc_params, ObKeyPart* cur1, ObKeyPart* cur2);
  int or_range_graph(ObKeyPartList& ranges, const ParamsIArray* params, ObKeyPart*& out_key_part,
      const common::ObDataTypeCastParams& dtc_params);
  int definite_in_range_graph(
      const ParamsIArray& params, ObKeyPart*& root, bool& has_scan_key, const common::ObDataTypeCastParams& dtc_params);
  // find all single range
  int and_first_search(ObSearchState& search_state, ObKeyPart* cur, ObQueryRangeArray& ranges,
      ObGetMethodArray& get_methods, const common::ObDataTypeCastParams& dtc_params);
  inline int generate_single_range(ObSearchState& search_state, int64_t column_num, uint64_t table_id,
      common::ObNewRange*& range, bool& is_get_range) const;
  inline int generate_true_or_false_range(
      const ObKeyPart* cur, common::ObIAllocator& allocator, common::ObNewRange*& range) const;
  int store_range(common::ObNewRange* range, bool is_get_range, ObSearchState& search_state, ObQueryRangeArray& ranges,
      ObGetMethodArray& get_methods);
  int alloc_empty_key_part(ObKeyPart*& out_key_part);
  int alloc_full_key_part(ObKeyPart*& out_key_part);
  int deep_copy_range_graph(ObKeyPart* src, ObKeyPart*& dest);
  int serialize_range_graph(
      const ObKeyPart* cur, const ObKeyPart* pre_and_next, char* buf, int64_t buf_len, int64_t& pos) const;
  int serialize_cur_keypart(const ObKeyPart& cur, char* buf, int64_t buf_len, int64_t& pos) const;
  int deserialize_range_graph(ObKeyPart* pre_key, ObKeyPart*& cur, const char* buf, int64_t data_len, int64_t& pos);
  int deserialize_cur_keypart(ObKeyPart*& cur, const char* buf, int64_t data_len, int64_t& pos);
  int64_t get_range_graph_serialize_size(const ObKeyPart* cur, const ObKeyPart* pre_and_next) const;
  int64_t get_cur_keypart_serialize_size(const ObKeyPart& cur) const;
  ObKeyPart* create_new_key_part();
  ObKeyPart* deep_copy_key_part(ObKeyPart* key_part);
  int64_t range_graph_to_string(char* buf, const int64_t buf_len, ObKeyPart* key_part) const;
  bool is_get_graph(int deepth, ObKeyPart* key_part);
  int get_like_range(const common::ObObj& pattern, const common::ObObj& escape, ObKeyPart& out_key_part,
      const ObDataTypeCastParams& dtc_params);
  int get_like_const_range(const ObConstRawExpr* text, const ObConstRawExpr* pattern, const ObConstRawExpr* escape,
      common::ObCollationType cmp_cs_type, ObKeyPart*& out_key_part, const common::ObDataTypeCastParams& dtc_params);
  int get_in_expr_res_type(const ObRawExpr* in_expr, int64_t val_idx, ObExprResType& res_type) const;
  inline bool is_standard_graph(const ObKeyPart* root) const;
  bool is_strict_in_graph(const ObKeyPart* root, const int64_t start_pos = 0) const;
  int is_strict_equal_graph(
      const ObKeyPart* root, const int64_t cur_pos, int64_t& max_pos, bool& is_strict_equal) const;
  bool is_regular_in_graph(const ObKeyPart* root) const;
  inline int get_single_key_value(const ObKeyPart* key, const ParamsIArray& params, ObSearchState& search_state,
      const common::ObDataTypeCastParams& dtc_params) const;
  inline int gen_simple_get_range(const ObKeyPart& root, common::ObIAllocator& allocator, const ParamsIArray& params,
      ObQueryRangeArray& ranges, ObGetMethodArray& get_methods, const common::ObDataTypeCastParams& dtc_params) const;
  int remove_precise_range_expr(int64_t offset);
  bool is_general_graph(const ObKeyPart& keypart) const;
  bool has_scan_key(const ObKeyPart& keypart) const;
  bool is_min_range_value(const common::ObObj& obj) const;
  bool is_max_range_value(const common::ObObj& obj) const;
  int check_is_get(ObKeyPart& key_part, const int64_t depth, const int64_t column_count, bool& bret) const;
  void check_like_range_precise(
      const ObString& pattern_str, const char* min_str_buf, const size_t min_str_len, const char escape);
  int cast_like_obj_if_needed(const ObObj& string_obj, ObObj& buf_obj, const ObObj*& obj_ptr, ObKeyPart& out_key_part,
      const ObDataTypeCastParams& dtc_params);
  int refine_large_range_graph(ObKeyPart *&key_part);
  int compute_range_size(const ObIArray<ObKeyPart*> &key_parts, const ObIArray<uint64_t> &or_count,
      ObIArray<ObKeyPart*> &next_key_parts, ObIArray<uint64_t> &next_or_count, uint64_t &range_size);
private:
  static const int64_t COMMON_KEY_PART_NUM = 256;
  static const int64_t RANGE_BUCKET_SIZE = 1000;
  static const int64_t MAX_RANGE_SIZE = 10000;
  typedef common::ObObjStore<ObKeyPart*, common::ObIAllocator&> KeyPartStore;

private:
  ObRangeGraph table_graph_;
  ObQueryRangeState state_;
  int64_t column_count_;
  bool contain_row_;
  // not need serialize
  common::ObArenaAllocator inner_allocator_;
  common::ObIAllocator& allocator_;
  ObQueryRangeCtx* query_range_ctx_;
  KeyPartStore key_part_store_;
  // this flag used by optimizer, so don't need to serialize it
  common::ObFixedArray<ObRawExpr*, common::ObIAllocator> range_exprs_;
  common::ObFixedArray<int64_t, common::ObIAllocator> param_indexs_;
  friend class ObKeyPart;
};
}  // namespace sql
}  // namespace oceanbase
#endif  // OCEANBASE_SQL_REWRITE_QUERY_RANGE_
